package com.yuanda.erp9.syn.execule.thread;

import com.yuanda.erp9.syn.enums.ThreadPoolTypeEnum;
import com.yuanda.erp9.syn.execule.factory.SimpleThreadPoolFactory;
import com.yuanda.erp9.syn.service.erp9.ESService;
import com.yuanda.erp9.syn.service.erp9.GoodsService;
import com.yuanda.erp9.syn.service.erp9.impl.ESServiceImpl;
import com.yuanda.erp9.syn.service.erp9.impl.GoodsServiceImpl;
import com.yuanda.erp9.syn.util.SpringContextUtil;
import com.yuanda.erp9.syn.util.SubListUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

/**
 * @ClassName Master
 * @Description 主任务类，负责拆分任务，并调度worker执行.
 * @Date 2022/11/15
 * @Author myq
 */
@Slf4j
public class Master<T> {
    /**
     * 存放所有任务的容器
     */
    private ConcurrentLinkedDeque<T> deque = new ConcurrentLinkedDeque<>();

    /**
     * 存储返回值
     */
    private ConcurrentHashMap<String, Object> resMap = new ConcurrentHashMap<>();

    /**
     * 是否开始创建子任务执行
     */
    public volatile boolean isBegin = false;

    /**
     * 是否上一个文件执行完
     */
    public static boolean isFile = false;
    /**
     * 子任务最大接受数量（每个子任务执行的任务最大次数）
     */
    private final int workTaskMaxSize = 3000;

    /**
     * 单例的线程池
     */
    ThreadPoolTaskExecutor executor;

    /**
     * 无参构造器
     */
    public Master() {
        //创建线程池
        executor = SimpleThreadPoolFactory.choose(ThreadPoolTypeEnum.SPECIAL_PURPOSE);
    }

    /**
     * @Description: 拆分数据并启动子线程执行任务，专门处理hispSet
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2023/1/515:08
     */
    public void setAndBegin(Integer supplier, List<T> data, Set<String> hispSet, Integer source) {
        if (CollectionUtils.isEmpty(data)) {
            return;
        }
        // 删除所有上次添加的数据
        getRsGoodIdsAndClear(supplier, hispSet, source);
        // 设置属性
        setMysqlInsertProperty();
        // 子任务集合
        List<List<T>> lists = SubListUtil.splitList(data, 3000);
        for (List<T> list : lists) {
            try {
                Thread.sleep(300);
                working(list, executor);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        isCompleted(executor);
        resetMysqlInsertProperyt();
        System.gc();
    }

    /**
     * @Description:
     * @Params:
     * @Return:
     * @Author: Mr.swq
     * @Date: 2023/1/515:26
     */
    public void getRsGoodIdsAndClear(Integer supplier, Set<String> hispSet, Integer source) {
        GoodsService goodsService = getBean();
        boolean flag = true;
        while (flag) {
            List<String> strings = goodsService.queryRsDeleteIds(supplier, hispSet, source);
            if (!StringUtils.isEmpty(strings) && strings.size() > 0) {//如果数据不空那么就一直删除
                this.deleteAllOfBefore(strings);
            } else {
                flag = false;
            }
        }
    }

    /**
     * @Description: 构造器（已知供应商ID 和 hisp字段）
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2023/1/515:02
     */
    public Master(Integer supplierId, String hisp, Integer source) {
        //创建线程池
        executor = SimpleThreadPoolFactory.choose(ThreadPoolTypeEnum.SPECIAL_PURPOSE);
        this.isBegin = true;
        long s = System.currentTimeMillis();
        // 删除所有上次添加的数据
        builderQueryAndDeleteOnlyES(supplierId, hisp, source);
        // 设置属性
        setMysqlInsertProperty();
        // 创建线程并执行start方法之前，执行delayStart方法
        new Thread(this::distributionTask, "Master-Batch-Insert-Stared").start();
    }

    /**
     * avnet 多文件解析时需要根据地区文件删除 并解析
     *
     * @param supplierId    供应商id
     * @param hisp
     * @param source        数据来源
     * @param repertoryArea 数据所属地区
     * @Description: 构造器
     */
    public Master(Integer supplierId, String hisp, Integer source, String repertoryArea) {
        //创建线程池
        executor = SimpleThreadPoolFactory.choose(ThreadPoolTypeEnum.SPECIAL_PURPOSE);
        this.isBegin = true;
        long s = System.currentTimeMillis();
        // 删除所有上次添加的数据
        DeleteOnlyES(supplierId, hisp, source, repertoryArea);
        // 设置属性
        setMysqlInsertProperty();
        // 创建线程并执行start方法之前，执行delayStart方法
        new Thread(this::distributionTask, "Master-Batch-Insert-Stared").start();
    }


    /**
     * @Description: 添加任务
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2022/11/1515:56
     */
    public void add(T task) {
        Assert.isTrue(deque.offer(task), "队列已满无法添加数据");
    }

    /**
     * @Description: 结束标志
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2023/1/515:18
     */
    public void isCompleted() {
        this.isBegin = false;
    }

    /**
     * @Description: 拆分任务，并创建子线程去执行
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2022/12/279:43
     */
    public void distributionTask() {
        List<T> tempList = new ArrayList<>();
        // 延时启动后
        while (isBegin) {
            // 获取队列里面的数据
            T poll = deque.poll();
            if (poll != null) {
                tempList.add(poll);
                // 满足创建子任务条件
                if (tempList.size() >= workTaskMaxSize) {
                    working(tempList, executor);
                    tempList.clear();
                }
            }
        }

        // 结束标志
        if (!isBegin) {
            // 获取队列里面的数据
            while (deque.size() > 0) {
                T poll = deque.poll();
                tempList.add(poll);
                if (tempList.size() >= workTaskMaxSize) {
                    working(tempList, executor);
                    tempList.clear();
                }
            }
            working(tempList, executor);
            tempList.clear();
        }

        isCompleted(executor);
        // 重置属性
        resetMysqlInsertProperyt();
        System.gc();
    }


    /**
     * @Description: 判断线程池中的线程是否执行完毕
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2022/12/2810:35
     */
    public void isCompleted(ThreadPoolTaskExecutor executor) {
        while (true) {
            if (executor.getActiveCount() <= 0) {
                log.debug(">>>>>>>>>>>>>>>>>>>>任务执行完毕<<<<<<<<<<<<<<<<<<<");
                break;
            }
        }
    }

    /**
     * @Description: 批量新增之后重置属性
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2022/12/2314:42
     */
    private void resetMysqlInsertProperyt() {
        GoodsService goodsService = getBean();
        goodsService.insertBatchBeforeSet();
    }

    /**
     * @Description: 批量新增之前设置属性
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2022/12/2314:41
     */
    private void setMysqlInsertProperty() {
        GoodsService goodsService = getBean();
        goodsService.insertBatchAfterReset();
    }

    /**
     * @Description: 子任务执行-并启动线程
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2022/11/1617:28
     */
    private void working(List<T> sonTaskList, ThreadPoolTaskExecutor executor) {
        ArrayList<T> mysqlList = new ArrayList<>(sonTaskList);
        AbstractWorker mysqlWorker = new MysqlWorker(mysqlList, resMap);
        executor.execute(mysqlWorker);

        ArrayList<T> esList = new ArrayList<>(sonTaskList);
        AbstractWorker esWorker = new ElasticSearchWorker(esList, resMap);
        executor.execute(esWorker);
    }

    /**
     * @Description: 删除上次数据
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2022/12/1915:20
     */
    private void deleteAllOfBefore(List<String> goodIds) {
        List<String> deleteIds = Collections.emptyList();
        try {
            if (CollectionUtils.isEmpty(goodIds)) {
                return;
            }
            List<String> mysqlIds = new ArrayList<>(goodIds);
            GoodsService goodsService = getBean();
            goodsService.deleteAll(mysqlIds);

            ESService esService = SpringContextUtil.getBean("ESServiceImpl", ESServiceImpl.class);
            ArrayList<String> esIds = new ArrayList<>(goodIds);
            esService.deleteAllByIds(esIds);
            log.info(">>>>>>>>>>>>>>>>>>>>>>>删除ES数据成功>>>>>>>>>>>>>>>>>>>>>>>");
        } finally {
            deleteIds.clear();
        }
    }

    /**
     * @Description:
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2023/1/515:26
     */
    public void getGoodIdsAndClear(Integer supplier, String hisp, Integer source) {
        GoodsService goodsService = getBean();
        boolean flag = true;
        while (flag) {
            List<String> strings = goodsService.queryDeleteIds(supplier, hisp, source);
            log.debug("需要删除的数据条目：" + strings.toString());
            //如果数据不空那么就一直删除
            if (!CollectionUtils.isEmpty(strings)) {
                this.deleteAllOfBefore(strings);
            } else {
                flag = false;
                log.debug("结束删除程序");
            }
        }
    }


    /**
     * @Description: 构建查看条件并删除数据
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2023/2/1415:52
     */
    public void builderQueryAndDeleteOnlyES(Integer supplier, String hisp, Integer source) {
        ESService esService = SpringContextUtil.getBean("ESServiceImpl", ESServiceImpl.class);
//        List<String> strings = esService.deleteByQueryBuilders(supplier, hisp, source);
//        if(!CollectionUtils.isEmpty(strings)) {
//            esService.deleteAllByIds(strings);
//        }
        esService.deleteBySupplierIdAndHispAndSource(supplier, hisp, source);
        log.info(">>>>>>>>>>>>>>>>>>>>>>>删除ES数据成功>>>>>>>>>>>>>>>>>>>>>>>");
    }

    /**
     * 删除es数据
     *
     * @param supplier
     * @param hisp
     * @param source
     * @param repertoryArea
     */
    public void DeleteOnlyES(Integer supplier, String hisp, Integer source, String repertoryArea) {
        ESService esService = SpringContextUtil.getBean("ESServiceImpl", ESServiceImpl.class);
        esService.deleteBySupplierIdAndHispAndSourceAndRepertoryArea(supplier, hisp, source, repertoryArea);
        log.info(">>>>>>>>>>>>>>>>>>>>>>>删除ES数据成功>>>>>>>>>>>>>>>>>>>>>>>");
    }


    /**
     * @Description: 返回GoodService
     * @Params:
     * @Return:
     * @Author: Mr.myq
     * @Date: 2022/12/2315:05
     */
    public GoodsService getBean() {
        return SpringContextUtil.getBean(GoodsService.class, "goodsServiceImpl", GoodsServiceImpl.class);
    }


}
